package com.markhsiu.minimq.broker.store.disk;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.core.constant.ByteUtils;

/**
 * topic存储的数据块
 * 
 * @author Mark Hsiu
 *
 */
public class TopicDiskBlock {

	private final Logger logger = LoggerFactory.getLogger(TopicDiskBlock.class);
	public static final int BLOCK_SIZE = 64 * 1024 * 1024;// 64MB

	public static final int EOF = -1;

	private String blockPath;
	private TopicDiskIndex index;
	private RandomAccessFile blockFile;
	private FileChannel blockChannel;
	private MappedByteBuffer block;
	private ByteBuffer byteBuffer;

	public TopicDiskBlock(String fileDir, String fileName, TopicDiskIndex index) {
		this.index = index;

		try {
			this.blockPath = fileDir+fileName;
			File file = new File(blockPath);
			this.blockFile = new RandomAccessFile(file, "rw");
			this.blockChannel = blockFile.getChannel();
			this.block = blockChannel.map(MapMode.READ_WRITE, 0, BLOCK_SIZE);
			this.byteBuffer = block.load();
			
		} catch (Exception e) {
			throw new IllegalArgumentException(e);
		}
	}


	public TopicDiskBlock duplicate() {
		return new TopicDiskBlock(this.index, this.blockFile, this.blockChannel, 
				this.block, this.byteBuffer.duplicate());
	}

	private TopicDiskBlock(TopicDiskIndex index, RandomAccessFile blockFile, FileChannel blockChannel,
			MappedByteBuffer block, ByteBuffer byteBuffer) {

		this.index = index;
		this.blockFile = blockFile;
		this.blockChannel = blockChannel;
		this.block = block;
		this.byteBuffer = byteBuffer;
	}

	public void putEOF() {
		this.byteBuffer.position(index.getWritePosition());
		this.byteBuffer.putInt(EOF);
	}
	
	  public String getBlockFilePath() {
          return blockPath;
      }

	public boolean isSpaceAvailable(int len) {
		int increment = len + 4;
		int writePosition = index.getWritePosition();
		return BLOCK_SIZE >= increment + writePosition + 4; // 保证最后有4字节的空间可以写入EOF
	}

	public boolean eof() {
		int readPosition = index.getReadPosition();
		return readPosition > 0 && byteBuffer.getInt(readPosition) == EOF;
	}

	public int write(byte[] bytes) {
		int len = bytes.length;
		int increment = len + 4;
		int writePosition = index.getWritePosition();
		byteBuffer.position(writePosition);
		byteBuffer.putInt(len);
		byteBuffer.put(bytes);
		index.putWritePosition(increment + writePosition);
		index.putWriteCount(index.getWriteCount() + 1);
		return increment;
	}

	public byte[] read() {
		byte[] bytes;
		int readNum = index.getReadNum();
		int readPosition = index.getReadPosition();
		int writeNum = index.getWriteNum();
		int writePosition = index.getWritePosition();
		if (readNum == writeNum && readPosition >= writePosition) {
			return null;
		}
		byteBuffer.position(readPosition);
		int dataLength = byteBuffer.getInt();
		if (dataLength <= 0) {
			return null;
		}
		bytes = new byte[dataLength];
		byteBuffer.get(bytes);
		index.putReadPosition(readPosition + bytes.length + 4);
		index.putReadCount(index.getReadCount() + 1);
		return bytes;
	}

	public byte[] read(int readPosition) {
		byte[] bytes;
		byteBuffer.position(readPosition);
		int dataLength = byteBuffer.getInt();
		if (dataLength <= 0) {
			return null;
		}
		bytes = new byte[dataLength];
		byteBuffer.get(bytes);
		return bytes;
	}

	public void sync() {
		if (block != null) {
			block.force();
		}
	}

	public void close() {
		try {
			if (block == null) {
				return;
			}
			sync();
			ByteUtils.cleanMappedByte(block);
			block = null;
			byteBuffer = null;
			blockChannel.close();
			blockFile.close();
		} catch (IOException e) {
			logger.error("close fqueue block file failed", e);
		}
	}

 
}
